package com.atuguigu.flink.Day06;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

//订单超时检测
public class Example1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        
        env
                .socketTextStream("localhost",9999)
                .map(
                        new MapFunction<String, Tuple3<String,String,Long>>() {
                            @Override
                            public Tuple3<String, String, Long> map(String value) throws Exception {
                                String [] arr=value.split(" ");
                                return Tuple3.of(arr[0],arr[1],Long.parseLong(arr[2]) * 1000L);
                            }
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                                return element.f2;
                            }
                        })
                )
                .keyBy(r->r.f0)
                .process(new KeyedProcessFunction<String, Tuple3<String, String, Long>, String>() {
                    private ValueState<Tuple3<String,String,Long>> state;
                    private ValueState<Boolean> isTimeout;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        state=getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String,String,Long>>("state", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)));
                        isTimeout=getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("isTimeout",Types.BOOLEAN));
                    }

                    @Override
                    public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
                            if(value.f1.equals("create")){
                                //证明创建了一个下单，但5s内是否支付不可知，所以将信息，保存再状态里面
                                if(state.value() == null){
                                    ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
                                    state.update(value);//将create的信息保存
                                }
                            }else if(value.f1.equals("pay")){
                                if(isTimeout.value() !=null && isTimeout.value()){
                                    out.collect("订单:" +value.f0 + "已经失效了");
                                }else {
                                    state.update(value);
                                    out.collect("订单：" +value.f0 +"支付成功");
                                }

                            }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        if(state.value() != null && state.value().f1.equals("create")){
                            out.collect("订单:" +state.value().f0 +"已经超过5s支付时间");
                            isTimeout.update(true);
                        }
                        state.clear();


                    }
                })
                .print();
        
        
        
        
        
        
        
        
        
        
        env.execute();

    }
}
